Kafka集群搭建(含ZK模式和Kraft模式)

您所在的位置:网站首页 sqlserver 集群模式 Kafka集群搭建(含ZK模式和Kraft模式)

Kafka集群搭建(含ZK模式和Kraft模式)

#Kafka集群搭建(含ZK模式和Kraft模式) | 来源: 网络整理| 查看: 265

前言环境介绍

虚拟机软件:VirtualBox

Linux 发行版本:Ubuntu 20.04.4

虚拟机核心数:1 core

虚拟机内存:2 GB

JDK 版本:1.8.0_202

ZK 版本:3.8.0

Kafka 版本:3.2.0

Kafka - ZK 模式

Kafka 2.8.0 之前,所有元数据信息都存储在 ZK。

ZK 成为 Kafka 瓶颈。从 2.8.0 开始,可以将元数据信息存储在 Kafka,脱离 ZK。

集群规划node01node02node03zkzkzkkafkakafkakafkaZK 集群部署

可以参考 《Hadoop HA 搭建》 中的 ZK 集群搭建

Kafka 集群部署Kafka 环境变量1234567$ vim /etc/profile# 尾部添加以下内容export KAFKA_HOME=/opt/kafka-3.2.0export PATH=$PATH:$KAFKA_HOME/bin$ xsync $KAFKA_HOME$ xsync /etc/profile配置 server.properties123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263$ vim $KAFKA_HOME/config/server.properties############################# Server Basics ############################## broker 全局唯一编号,不能重复broker.id=0############################# Socket Server Settings ############################## 网络请求线程数量num.network.threads=3# IO 线程数量num.io.threads=8# 发送套接字缓冲区大小socket.send.buffer.bytes=102400# 接受套接字缓冲区大小socket.receive.buffer.bytes=102400# 请求套接字缓冲区大小socket.request.max.bytes=104857600############################# Log Basics ############################## kafka 日志路径log.dirs=/tmp/kafka-logs# 该 broker 上分区数量num.partitions=1# 恢复和清理 data 下数据的线程数量num.recovery.threads.per.data.dir=1############################# Internal Topic Settings ############################## topic 副本个数offsets.topic.replication.factor=1transaction.state.log.replication.factor=1transaction.state.log.min.isr=1############################# Log Retention Policy ############################## 文件保留最长时间log.retention.hours=168# 单文件大小上限log.segment.bytes=1073741824# 检查过期数据间隔log.retention.check.interval.ms=300000############################# Zookeeper ############################## 配置 zk,及元数据存储位置zookeeper.connect=node01:2181,node02:2181,node03:2181/kafka# 连接 ZK 超时时间zookeeper.connection.timeout.ms=18000############################# Group Coordinator Settings #############################group.initial.rebalance.delay.ms=0$ xsync $KAFKA_HOME/config/server.properties

注意:每个机子上的,broker.id 要不一样,node01 ~ node03 依次为 1、2、3

Kafka 集群启动1234# 启动$ xcall kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties# 关闭$ xcall kafka-server-stop.sh

注意:停止 Kafka 集群时,一定要等 Kafka 所有节点进程全部停止后再停止 Zookeeper 集群。因为 Zookeeper 集群当中记录着 Kafka 集群相关信息,Zookeeper 集群一旦先停止,Kafka 集群就没有办法再获取停止进程的信息,只能手动杀死 Kafka 进程了。

检验123456# 创建 topic$ kafka-topics.sh --bootstrap-server node01:9092,node02:9092,node03:9092 --create --partitions 3 --replication-factor 3 --topic hello# 创建生产者$ kafka-console-producer.sh --bootstrap-server node01:9092,node02:9092,node03:9092 --topic hello# 创建消费者$ kafka-console-consumer.sh --bootstrap-server node01:9092,node02:9092,node03:9092 --topic hello

生产者发送消息后,消费者收到则说明部署成功

Kafka - Kraft 模式

Kafka 2.8.0 开始可以摆脱 ZK,这样做的好处有以下几个:

Kafka 不再依赖外部框架,而是能够独立运行controller 管理集群时,不再需要从 zookeeper 中先读取数据,集群性能上升由于不依赖 zookeeper,集群扩展时不再受到 zookeeper 读写能力限制controller 不再动态选举,而是由配置文件规定。这样我们可以有针对性的加强。controller 节点的配置,而不是像以前一样对随机 controller 节点的高负载束手无策集群规划node01node02node03kafka-controllerkafka-controllerkafka-controllerkafka-brokerkafka-brokerkafka-brokerKafka 集群部署Kafka 环境变量

与上文一致

配置 server.properties

注意:本节的 server.properties 与上文不同,这里的是在 kraft 目录下的

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091$ vim $KAFKA_HOME/config/kraft/server.properties############################# Server Basics ############################## 设置当前节点的角色process.roles=broker,controller# 全局唯一编号,不能重复。分发后记得修改node.id=1# controller 列表,代替之前的 ZK 列表controller.quorum.voters=1@node01:9093,2@node02:9093,3@node03:9093############################# Socket Server Settings ############################## The address the socket server listens on.# Combined nodes (i.e. those with `process.roles=broker,controller`) must list the controller listener here at a minimum.# If the broker listener is not defined, the default listener will use a host name that is equal to the value of java.net.InetAddress.getCanonicalHostName(),# with PLAINTEXT listener name, and port 9092.# FORMAT:# listeners = listener_name://host_name:port# EXAMPLE:# listeners = PLAINTEXT://your.host.name:9092listeners=PLAINTEXT://:9092,CONTROLLER://:9093# Name of listener used for communication between brokers.inter.broker.listener.name=PLAINTEXT# broker 对外暴露的地址,分发后记得修改advertised.listeners=PLAINTEXT://node01:9092# A comma-separated list of the names of the listeners used by the controller.# If no explicit mapping set in `listener.security.protocol.map`, default will be using PLAINTEXT protocol# This is required if running in KRaft mode.controller.listener.names=CONTROLLER# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more detailslistener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL# The number of threads that the server uses for receiving requests from the network and sending responses to the networknum.network.threads=3# The number of threads that the server uses for processing requests, which may include disk I/Onum.io.threads=8# The send buffer (SO_SNDBUF) used by the socket serversocket.send.buffer.bytes=102400# The receive buffer (SO_RCVBUF) used by the socket serversocket.receive.buffer.bytes=102400# The maximum size of a request that the socket server will accept (protection against OOM)socket.request.max.bytes=104857600############################# Log Basics ############################## kafka 日志路径log.dirs=/tmp/kraft-combined-logs# The default number of log partitions per topic. More partitions allow greater# parallelism for consumption, but this will also result in more files across# the brokers.num.partitions=1# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.# This value is recommended to be increased for installations with data dirs located in RAID array.num.recovery.threads.per.data.dir=1############################# Internal Topic Settings ############################## The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.offsets.topic.replication.factor=1transaction.state.log.replication.factor=1transaction.state.log.min.isr=1############################# Log Retention Policy ############################## The minimum age of a log file to be eligible for deletion due to agelog.retention.hours=168# The maximum size of a log segment file. When this size is reached a new log segment will be created.log.segment.bytes=1073741824# The interval at which log segments are checked to see if they can be deleted according# to the retention policieslog.retention.check.interval.ms=300000$ xsync $KAFKA_HOME/config/kraft/server.properties

注意:

每个机子上的 broker.id 要不一样,node01 ~ node03 依次为 1、2、3

每个机子上的 advertised.listeners 要不一样,node01 ~ node03 依次为 PLAINTEXT://node01:9092、PLAINTEXT://node02:9092、PLAINTEXT://node03:9092

初始化 Kafka12345# 生成存储目录唯一 ID$ kafka-storage.sh random-uuidYE6iIE-zT4m45ZdkY3nz1A# 根据上面生成的 ID,对所有机子初始化$ xcall kafka-storage.sh format -t YE6iIE-zT4m45ZdkY3nz1A -c $KAFKA_HOME/config/kraft/server.propertiesKafka 集群启动1$ xcall kafka-server-start.sh -daemon $KAFKA_HOME/config/kraft/server.properties检验

与上文一致

Kafka - Eagle 监控

随便找一台机子部署 Kafka - Eagle,Kafka - Eagle 需要数据库,这里选择 MySQL。

本文将 Eagle 和 MySQL 都部署在 node01 上。

Eagle 版本:2.1.0

MySQL 版本:8.0.28

MySQL 部署

可以参考 《基于 Hadoop HA 的 Hive 搭建》 中的 MySQL 部署

Kafka 准备

修改 kafka-server-start.sh

1$ vim $KAFKA_HOME/bin/kafka-server-start.sh

将以下内容(在 28 行)

123if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"fi

修改为

1234if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then export KAFKA_HEAP_OPTS="-server -Xms2G -Xmx2G -XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70" export JMX_PORT="9999"fi

分发到其他节点

1$ xsync $KAFKA_HOME/bin/kafka-server-start.sh

重启 Kafka

12$ xcall kafka-server-stop.sh$ xcall kafka-server-start.sh -daemon $KAFKA_HOME/config/server.propertiesKE 部署KE 环境变量12345$ vim /etc/profile# 尾部添加以下内容export KE_HOME=/opt/efak-web-2.1.0export PATH=$PATH:$KE_HOME/bin$ source /etc/profile配置 system-config.properties123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125$ vim $KE_HOME/conf/system-config.properties####################################### multi zookeeper & kafka cluster list# Settings prefixed with 'kafka.eagle.' will be deprecated, use 'efak.' instead# 配置 ZK######################################efak.zk.cluster.alias=cluster1cluster1.zk.list=node01:2181,node02:2181,node03:2181/kafka####################################### zookeeper enable acl######################################cluster1.zk.acl.enable=falsecluster1.zk.acl.schema=digestcluster1.zk.acl.username=testcluster1.zk.acl.password=test123####################################### broker size online list######################################cluster1.efak.broker.size=20####################################### zk client thread limit######################################kafka.zk.limit.size=16####################################### EFAK webui port######################################efak.webui.port=8048####################################### EFAK enable distributed######################################efak.distributed.enable=falseefak.cluster.mode.status=masterefak.worknode.master.host=localhostefak.worknode.port=8085####################################### kafka jmx acl and ssl authenticate######################################cluster1.efak.jmx.acl=falsecluster1.efak.jmx.user=keadmincluster1.efak.jmx.password=keadmin123cluster1.efak.jmx.ssl=falsecluster1.efak.jmx.truststore.location=/data/ssl/certificates/kafka.truststorecluster1.efak.jmx.truststore.password=ke123456####################################### kafka offset storage# kafka offset 保存位置# 0.9 之前在 ZK,0.9 之后在 Kafka# 根据 Kafka 版本选择,上面部署的 Kafka 是 3.2.0,所以存储在 Kafka######################################cluster1.efak.offset.storage=kafka# cluster1.efak.offset.storage=kafka####################################### kafka jmx uri######################################cluster1.efak.jmx.uri=service:jmx:rmi:///jndi/rmi://%s/jmxrmi####################################### kafka metrics, 15 days by default######################################efak.metrics.charts=trueefak.metrics.retain=15####################################### kafka sql topic records max######################################efak.sql.topic.records.max=5000efak.sql.topic.preview.records.max=10####################################### delete kafka topic token######################################efak.topic.token=keadmin####################################### kafka sasl authenticate######################################cluster1.efak.sasl.enable=falsecluster1.efak.sasl.protocol=SASL_PLAINTEXTcluster1.efak.sasl.mechanism=SCRAM-SHA-256cluster1.efak.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka" password="kafka-eagle";cluster1.efak.sasl.client.id=cluster1.efak.blacklist.topics=cluster1.efak.sasl.cgroup.enable=falsecluster1.efak.sasl.cgroup.topics=cluster2.efak.sasl.enable=falsecluster2.efak.sasl.protocol=SASL_PLAINTEXTcluster2.efak.sasl.mechanism=PLAINcluster2.efak.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka" password="kafka-eagle";cluster2.efak.sasl.client.id=cluster2.efak.blacklist.topics=cluster2.efak.sasl.cgroup.enable=falsecluster2.efak.sasl.cgroup.topics=####################################### kafka ssl authenticate######################################cluster3.efak.ssl.enable=falsecluster3.efak.ssl.protocol=SSLcluster3.efak.ssl.truststore.location=cluster3.efak.ssl.truststore.password=cluster3.efak.ssl.keystore.location=cluster3.efak.ssl.keystore.password=cluster3.efak.ssl.key.password=cluster3.efak.ssl.endpoint.identification.algorithm=httpscluster3.efak.blacklist.topics=cluster3.efak.ssl.cgroup.enable=falsecluster3.efak.ssl.cgroup.topics=####################################### kafka mysql jdbc driver address# 配置 JDBC 连接,上面安装的是 MySQL######################################efak.driver=com.mysql.cj.jdbc.Driverefak.url=jdbc:mysql://127.0.0.1:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNullefak.username=rootefak.password=123456KE 启动123456789101112131415161718192021222324# 启动$ ke.sh start # 出现以下内容表示启动成功[2022-05-21 22:22:21] INFO: [Job done!]Welcome to ______ ______ ___ __ __ / ____/ / ____/ / | / //_/ / __/ / /_ / /| | / ,< / /___ / __/ / ___ | / /| | /_____/ /_/ /_/ |_|/_/ |_| ( Eagle For Apache Kafka® )Version 2.1.0 -- Copyright 2016-2022******************************************************************** EFAK Service has started success.* Welcome, Now you can visit 'http://192.168.128.101:8048'* Account:admin ,Password:123456******************************************************************** ke.sh [start|status|stop|restart|stats] * https://www.kafka-eagle.org/ *******************************************************************# 关闭$ ke.sh stop


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3